package com.dahuan.state;

import com.dahuan.bean.SensorReading;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class State_KeyedState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // env.setParallelism( 1 );
        //TODO 基于事件时间
        env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime );
        //水印之间的间隔（以毫秒为单位）
        env.getConfig().setAutoWatermarkInterval( 100 );

        DataStream<String> inputStream = env.socketTextStream( "localhost", 7777 );
        DataStream<SensorReading> dataStream = inputStream.map( data -> {
            String[] split = data.split( "," );
            return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );
        } );

        //TODO 定义一个有状态的map操作 , 统计当前分区数据个数
        SingleOutputStreamOperator<Integer> resultStream = dataStream.keyBy( "id" )
                .map( new MyKeyCountMapper() );

        resultStream.print();

        env.execute( "State_OperatorState" );
    }

    //自定义RichMapFunction
    public static class MyKeyCountMapper extends RichMapFunction<SensorReading, Integer> {

        //声明一个状态
        private ValueState<Integer> keyCountState;

        //TODO 其他类型状态的声明
        private ListState<String> myListState;

        //定义一个MapState状态
        private MapState<String, Double> myMapState;

        //
        private ReducingState<SensorReading> myReducingState;

        @Override
        public void open(Configuration parameters) throws Exception {
            //设置状态
            keyCountState = getRuntimeContext().getState( new ValueStateDescriptor<Integer>( "key-count", Integer.class, 0 ) );
            myListState = getRuntimeContext().getListState( new ListStateDescriptor<String>( "my-list", String.class ) );
            myMapState = getRuntimeContext().getMapState( new MapStateDescriptor<String, Double>( "my-map", String.class, Double.class ) );
            //myReducingState = getRuntimeContext().getReducingState( new ReducingStateDescriptor<SensorReading>("my-ReducingState",SensorReading.class));
        }

        @Override
        public Integer map(SensorReading value) throws Exception {


            //TODO 其他状态API调用
            //List 返回状态的当前值
            for (String str : myListState.get()) {
                System.out.println( str );
            }
            myListState.add( "hello" );

            //TODO map state
            myMapState.get( "1" );
            //添加
            myMapState.put( "2",12.3 );
            //删除
            myMapState.remove( "2" );


            //TODO reducing state
            myReducingState.add( value );
            //清除状态
            myReducingState.clear();




            //TODO 获取状态的值
            Integer count = keyCountState.value();
            //自增
            count++;
            //将自增的值放到状态里
            keyCountState.update( count );
            //返回自增结果
            return count;

        }
    }
}
